---
title: "AgUiClient"
description: "Main client class for AG-UI server connectivity"
---

# AgUiClient

The `AgUiClient` class is the primary interface for connecting to AG-UI compatible servers. It handles HTTP communication, SSE streaming, binary protocol encoding/decoding, and provides a type-safe API for agent interactions.

## Constructor

```dart
AgUiClient({
  required AgUiClientConfig config,
  http.Client? httpClient,
  Encoder? encoder,
  Decoder? decoder,
})
```

### Parameters

- `config` (required): Configuration object with server details
- `httpClient` (optional): Custom HTTP client implementation
- `encoder` (optional): Custom encoder for request serialization
- `decoder` (optional): Custom decoder for response parsing

## Methods

### runAgent

Executes an agent and returns a stream of decoded events.

```dart
Stream<BaseEvent> runAgent(
  String agentId,
  RunAgentInput input, {
  Map<String, String>? headers,
})
```

#### Parameters

- `agentId`: Unique identifier for the agent
- `input`: Agent input containing messages, context, and configuration
- `headers`: Optional additional headers for this request

#### Returns

A `Stream<BaseEvent>` that emits protocol events as they arrive.

#### Example

```dart
final input = SimpleRunAgentInput(
  messages: [
    UserMessage(id: 'msg_1', content: 'Hello, agent!'),
  ],
  context: {'sessionId': '12345'},
);

await for (final event in client.runAgent('chat-agent', input)) {
  switch (event) {
    case RunStartedEvent():
      print('Agent started');
    case TextMessageDeltaEvent(delta: final text):
      print('Agent says: $text');
    case RunFinishedEvent():
      print('Agent finished');
  }
}
```

### runAgentRaw

Executes an agent and returns raw SSE messages without decoding.

```dart
Stream<SseMessage> runAgentRaw(
  String agentId,
  RunAgentInput input, {
  Map<String, String>? headers,
})
```

#### Use Cases

- Custom event processing
- Debugging and logging
- Performance optimization when decoding isn't needed

#### Example

```dart
await for (final message in client.runAgentRaw('agent', input)) {
  print('Raw event: ${message.event}');
  print('Raw data: ${message.data}');
}
```

### cancelAgent

Cancels an active agent execution.

```dart
Future<void> cancelAgent(String agentId)
```

#### Parameters

- `agentId`: The agent ID to cancel

#### Behavior

- Immediately closes the SSE connection
- Cleans up resources
- Causes the event stream to complete

#### Example

```dart
// Start long-running agent
final stream = client.runAgent('analysis-agent', input);

// Set up listener with timeout
final subscription = stream.listen(
  (event) => processEvent(event),
  onError: (error) => handleError(error),
);

// Cancel after 10 seconds
Timer(Duration(seconds: 10), () async {
  await client.cancelAgent('analysis-agent');
  await subscription.cancel();
});
```

### dispose

Cleans up all resources held by the client.

```dart
void dispose()
```

#### Important

- Call this when the client is no longer needed
- Cancels all active streams
- Closes HTTP client connections
- Releases memory resources

## Properties

### config

```dart
final AgUiClientConfig config;
```

The configuration used to initialize the client. Read-only.

### activeStreams

```dart
Map<String, bool> get activeStreams;
```

Returns a map of currently active agent IDs and their status.

## Error Handling

The client throws specific exceptions for different error scenarios:

### AgUiClientError

General client-side errors.

```dart
try {
  await for (final event in client.runAgent('agent', input)) {
    // Process events
  }
} on AgUiClientError catch (e) {
  print('Client error: ${e.message}');
  print('Error code: ${e.code}');
}
```

### NetworkError

Network connectivity issues.

```dart
on NetworkError catch (e) {
  print('Network error: ${e.message}');
  // Implement retry logic
}
```

### ValidationError

Input validation failures.

```dart
on ValidationError catch (e) {
  print('Validation failed: ${e.message}');
  print('Failed fields: ${e.fields}');
}
```

### ServerError

Server-side errors (5xx status codes).

```dart
on ServerError catch (e) {
  print('Server error: ${e.statusCode}');
  print('Message: ${e.message}');
}
```

## Advanced Usage

### Custom HTTP Client

Provide a custom HTTP client for advanced scenarios:

```dart
import 'package:http/http.dart' as http;
import 'package:http/retry.dart';

final retryClient = RetryClient(http.Client());

final client = AgUiClient(
  config: AgUiClientConfig(baseUrl: 'http://localhost:8000'),
  httpClient: retryClient,
);
```

### Custom Encoding/Decoding

Implement custom encoders/decoders for specialized formats:

```dart
class CustomEncoder implements Encoder {
  @override
  List<int> encode(RunAgentInput input) {
    // Custom encoding logic
    return utf8.encode(jsonEncode(input.toJson()));
  }
}

class CustomDecoder implements Decoder {
  @override
  BaseEvent decode(List<int> data) {
    // Custom decoding logic
    final json = jsonDecode(utf8.decode(data));
    return BaseEvent.fromJson(json);
  }
}

final client = AgUiClient(
  config: config,
  encoder: CustomEncoder(),
  decoder: CustomDecoder(),
);
```

### Stream Transformations

Transform the event stream for specific use cases:

```dart
// Filter only message events
final messageStream = client
    .runAgent('agent', input)
    .where((event) => event is TextMessageEvent);

// Collect all text into a single string
final completeText = await client
    .runAgent('agent', input)
    .whereType<TextMessageDeltaEvent>()
    .map((event) => event.delta)
    .join();
```

### Concurrent Agents

Run multiple agents concurrently:

```dart
final agent1 = client.runAgent('agent1', input1);
final agent2 = client.runAgent('agent2', input2);

// Process both streams
await Future.wait([
  agent1.forEach((event) => processAgent1(event)),
  agent2.forEach((event) => processAgent2(event)),
]);
```

## Performance Considerations

### Connection Pooling

The client reuses HTTP connections when possible. For high-throughput scenarios:

```dart
final httpClient = http.Client();
// Configure connection pooling
final client = AgUiClient(
  config: config,
  httpClient: httpClient,
);
```

### Memory Management

For long-running streams:

1. Process events immediately rather than buffering
2. Cancel streams when no longer needed
3. Dispose of the client when done

### Binary Protocol

The binary protocol is more efficient than JSON for large payloads:

```dart
// Binary protocol is used automatically when supported
final stream = client.runAgent('agent', input);
```

## Testing

Mock the client for unit tests:

```dart
class MockAgUiClient implements AgUiClient {
  @override
  Stream<BaseEvent> runAgent(String agentId, RunAgentInput input) {
    return Stream.fromIterable([
      RunStartedEvent(runId: 'test-run'),
      TextMessageStartedEvent(messageId: 'msg-1'),
      TextMessageDeltaEvent(messageId: 'msg-1', delta: 'Hello'),
      TextMessageFinishedEvent(messageId: 'msg-1'),
      RunFinishedEvent(runId: 'test-run'),
    ]);
  }
}
```